package com.magupe.push.rpc;

import java.lang.reflect.Proxy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.magupe.push.common.codec.PacketDecoder;
import com.magupe.push.common.codec.PacketEncoder;
import com.magupe.push.common.codec.Spliter;
import com.magupe.push.common.protocol.request.RPCMessageRequestPacket;
import com.magupe.push.log.util.LogUtils;
import com.magupe.push.rpc.handler.RPCMessageResponseHandler;
import com.magupe.push.rpc.service.MessageService;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class RPCClient {

	protected static Logger logger = LoggerFactory.getLogger(RPCClient.class);
	
	private static final int MAX_RETRY = 3;
	
	private String host;
	private String port;

	public RPCClient(){
		
	}
	
	public RPCClient(String host, String port){
		this.host = host;
		this.port = port;
	}
	
	public MessageService createProxy(Class<?> messageService) {
		ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
		Class<?>[] interfaces = new Class<?>[]{messageService};
        return (MessageService) Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {
        	if(method.getName().equals("sendMessage")) {
        		RPCMessageRequestPacket packet = (RPCMessageRequestPacket) args[0];
				RPCMessageResponseHandler handler = new RPCMessageResponseHandler();
		    	NioEventLoopGroup workerGroup = new NioEventLoopGroup();
		    	Bootstrap bootstrap = new Bootstrap();
		        bootstrap
			        .group(workerGroup)
			        .channel(NioSocketChannel.class)
			        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
			        .option(ChannelOption.SO_KEEPALIVE, true)
			        .option(ChannelOption.TCP_NODELAY, true)
			        .handler(new ChannelInitializer<SocketChannel>() {
			            @Override
			            public void initChannel(SocketChannel ch) {
			            	ch.pipeline().addLast(new Spliter());
			                ch.pipeline().addLast(new PacketDecoder());
			                ch.pipeline().addLast(handler);
			                ch.pipeline().addLast(new PacketEncoder());
			            }
			        });

		        ChannelFuture future = connectRetry(bootstrap, host, Integer.parseInt(port), MAX_RETRY);
		        if(future == null) {
		        	System.exit(0);
		        }
		        future.channel().writeAndFlush(packet);
		        
		        LogUtils.processLog(future.channel(), packet, "RPC转发推送消息", false, null);
		        
				return handler.getResult();
        	}
        	
        	return null;
        });
	}
	
	protected ChannelFuture connectRetry(Bootstrap bootstrap, String host, int port, int retry) {
        try {
        	return bootstrap.connect(host, port).sync();
		} catch (Exception e) {
			retry --;
			if(retry <= 0) {
				logger.error("Connect Netty Error", e);
				return null;
			}
			logger.info("retry");
			return connectRetry(bootstrap, host, port, retry);
		}
    }
}
